package com.atuguigu.flink.Day04;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.HashSet;

// UV: Unique Visitor
// PV使用用户ID去重以后，就是UV
//用户访问次数
public class Example3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<UserBehavior> pvStream = env
                .readTextFile("E:\\data\\UserBehavior.csv")
                .map(
                        new MapFunction<String, UserBehavior>() {
                            @Override
                            public UserBehavior map(String value) throws Exception {
                                String[] arr = value.split(",");
                                return new UserBehavior(arr[0], arr[1], arr[2], arr[3], Long.parseLong(arr[4]) * 1000L);
                            }
                        }
                )
                .filter(r -> r.behaviortype.equals("pv"))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehavior>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                            @Override
                            public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));
        pvStream
                .map(new MapFunction<UserBehavior, Tuple2<String,String>>() {
                    @Override
                    public Tuple2<String, String> map(UserBehavior value) throws Exception {
                        return Tuple2.of("key",value.userId);
                    }
                })
                .keyBy(r->r.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .aggregate(new CountAgg(),new WindowResult())
                .print();


        env.execute();
    }

    public static  class WindowResult extends ProcessWindowFunction<Long,String,String, TimeWindow> {

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
            out.collect("窗口" + new Timestamp(context.window().getStart()) + "==>" +new Timestamp(context.window().getEnd()) + "的pv数据是" + elements.iterator().next()  );
        }
    }

    public static class CountAgg implements AggregateFunction<Tuple2<String,String>, HashSet<String>,Long>{

        @Override
        public HashSet<String> createAccumulator() {
            return new HashSet<>();
        }

        @Override
        public HashSet<String> add(Tuple2<String, String> value, HashSet<String> accumulator) {
            accumulator.add(value.f1);
            return accumulator;
        }

        @Override
        public Long getResult(HashSet<String> accumulator) {
            return (long)accumulator.size();
        }

        @Override
        public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
            return null;
        }
    }

    //POJO类
    public static  class UserBehavior{
        public String userId;
        public String iteamId;
        public String categoryid;
        public String behaviortype;
        public Long timestamp;

        public UserBehavior() {
        }

        public UserBehavior(String userId, String iteamId, String categoryid, String behaviortype, Long timestamp) {
            this.userId = userId;
            this.iteamId = iteamId;
            this.categoryid = categoryid;
            this.behaviortype = behaviortype;
            this.timestamp = timestamp;
        }

        @Override
        public String toString() {
            return "UserBehavior{" +
                    "userId='" + userId + '\'' +
                    ", iteamId='" + iteamId + '\'' +
                    ", categoryid='" + categoryid + '\'' +
                    ", behaviortype='" + behaviortype + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
}
